package org.example.consumer;

import com.google.gson.JsonObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.example.bean.AppCtx;
import org.example.framework.util.JsonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
//import redis.clients.jedis.JedisPool;

import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;

@SuppressWarnings("unchecked")
public class KafkaConsumerWorker implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerWorker.class);
    private final Random random = new Random();
    private final KafkaConsumer<String, String> consumer;
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    // 非spring组件调用springBean无法注入
    private final List<JdbcTemplate> clickHouseDataSourcePool = (List<JdbcTemplate>) AppCtx.getBean("clickHouseDataSourcePool");

    public KafkaConsumerWorker(KafkaConsumer<String, String> consumer) {
        this.consumer = consumer;
    }

    @Override
    public void run() {
        while (!isClosed.get()) {
            try {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> item : records) {
                    LOGGER.info("thread={}, key={}, value={}", Thread.currentThread().getId(), item.key(), item.value());
                    JsonObject jObject = JsonUtil.fromJson(item.value(), JsonObject.class);
                    LOGGER.info("thread={}, jObject={}", Thread.currentThread().getId(), jObject);
                    // 检查必要字段是否齐全
                    if (!checkField(jObject)) {
                        continue;
                    }
                    // do your business here
                }
            } catch (WakeupException e) {
                if (!isClosed.get()) {
                    // Kafka正常退出利用的是另一个线程唤醒抛出WakeupException, 这里是正常关闭.
                    LOGGER.info("shutting down consumer.", e);
                    consumer.close();
                }
            }
        }
    }

    private boolean checkField(JsonObject jObject) {
        return true;
    }

    public void shutdown() {
        isClosed.set(true);
        consumer.wakeup();
    }
}
